#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

"""This module groups util function used in pipelines."""
from __future__ import annotations

import sys
from pathlib import Path

import asyncclick as click
from dagger import DaggerError
from pipelines import consts, main_logger
from pipelines.consts import GCS_PUBLIC_DOMAIN, STATIC_REPORT_PREFIX
from pipelines.helpers import sentry_utils
from pipelines.helpers.gcs import upload_to_gcs
from pipelines.helpers.utils import slugify


class DaggerPipelineCommand(click.Command):
    @sentry_utils.with_command_context
    async def invoke(self, ctx: click.Context) -> None:
        """Wrap parent invoke in a try catch suited to handle pipeline failures.
        Args:
            ctx (click.Context): The invocation context.
        Raises:
            e: Raise whatever exception that was caught.
        """
        command_name = self.name
        main_logger.info(f"Running Dagger Command {command_name}...")
        main_logger.info(
            "If you're running this command for the first time the Dagger engine image will be pulled, it can take a short minute..."
        )
        ctx.obj["report_output_prefix"] = self.render_report_output_prefix(ctx)
        dagger_logs_gcs_key = f"{ctx.obj['report_output_prefix']}/dagger-logs.txt"
        try:
            if not ctx.obj["show_dagger_logs"]:
                dagger_log_dir = Path(f"{consts.LOCAL_REPORTS_PATH_ROOT}/{ctx.obj['report_output_prefix']}")
                dagger_log_path = Path(f"{dagger_log_dir}/dagger.log").resolve()
                ctx.obj["dagger_logs_path"] = dagger_log_path
                main_logger.info(f"Saving dagger logs to: {dagger_log_path}")
                if ctx.obj["is_ci"]:
                    ctx.obj["dagger_logs_url"] = f"{GCS_PUBLIC_DOMAIN}/{ctx.obj['ci_report_bucket_name']}/{dagger_logs_gcs_key}"
                else:
                    ctx.obj["dagger_logs_url"] = None
            else:
                ctx.obj["dagger_logs_path"] = None
            pipeline_success = await super().invoke(ctx)
            if not pipeline_success:
                raise DaggerError(f"Dagger Command {command_name} failed.")
        except DaggerError as e:
            main_logger.error(f"Dagger Command {command_name} failed", exc_info=e)
            sys.exit(1)
        finally:
            if ctx.obj.get("dagger_logs_path"):
                main_logger.info(f"Dagger logs saved to {ctx.obj['dagger_logs_path']}")
                if ctx.obj["is_ci"] and ctx.obj["ci_gcp_credentials"] and ctx.obj["ci_report_bucket_name"]:
                    gcs_uri, public_url = upload_to_gcs(
                        ctx.obj["dagger_logs_path"],
                        ctx.obj["ci_report_bucket_name"],
                        dagger_logs_gcs_key,
                        ctx.obj["ci_gcp_credentials"].value,
                    )
                    main_logger.info(f"Dagger logs saved to {gcs_uri}. Public URL: {public_url}")

    @staticmethod
    def render_report_output_prefix(ctx: click.Context) -> str:
        """Render the report output prefix for any command in the Connector CLI.

        The goal is to standardize the output of all logs and reports generated by the CLI
        related to a specific command, and to a specific CI context.

        Note: We cannot hoist this higher in the command hierarchy because only one level of
        subcommands are available at the time the context is created.
        """

        git_branch = ctx.obj["git_branch"]
        git_revision = ctx.obj["git_revision"]
        pipeline_start_timestamp = ctx.obj["pipeline_start_timestamp"]
        ci_context = ctx.obj["ci_context"]
        ci_job_key = ctx.obj["ci_job_key"] if ctx.obj.get("ci_job_key") else ci_context

        sanitized_branch = slugify(git_branch.replace("/", "_"))

        # get the command name for the current context, if a group then prepend the parent command name
        if ctx.command_path:
            cmd_components = ctx.command_path.split(" ")
            cmd_components[0] = STATIC_REPORT_PREFIX
            cmd = "/".join(cmd_components)
        else:
            cmd = None

        path_values = [
            cmd,
            ci_job_key,
            sanitized_branch,
            pipeline_start_timestamp,
            git_revision,
        ]

        # check all values are defined
        if None in path_values:
            raise ValueError(f"Missing value required to render the report output prefix: {path_values}")

        # join all values with a slash, and convert all values to string
        return "/".join(map(str, path_values))
